/*
 *文件名： MsgSink.java
 *版权： Copyright by 云天励飞 intellif.com
 *描述： Description
 *创建人：mozping
 *创建时间： 2018/8/13 9:24
 *修改理由：
 *修改内容：
 */
package indi.mozping.kafka.pc;

import com.alibaba.fastjson.JSONObject;
import indi.mozping.utils.MyMsgFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/**
 * 监听Mq消息
 *
 * @author mozping
 * @version 1.0
 * @date 2018/8/13 9:24
 * @see MqSinkReceiver
 * @since JDK1.8
 */
@EnableBinding(value = {MqSinkI.class})
public class MqSinkReceiver {


    public static final Logger LOG = LoggerFactory.getLogger(MqSinkReceiver.class.getName());

    @Autowired
    MyMsgFilter myMsgFilter;


    @StreamListener(MqSinkI.INPUT_CHANNEL)
    public void messageListen1(Message<JSONObject> message) throws Exception {
        MessageHeaders messageHeaders = message.getHeaders();
        JSONObject payload = message.getPayload();


        long offset = (long) messageHeaders.get("kafka_offset");
        LOG.info("消费成功" + "，偏移量:" + offset + ", 消息内容:" + message.getPayload());
        Thread.sleep(5 * 1000);
    }

    @StreamListener(MqSinkI.INPUT_CHANNEL)
    public void messageListen(Message<JSONObject> message) throws Exception {

        boolean isHealth = true;
        Acknowledgment acknowledgmentk = (Acknowledgment) message.getHeaders().get("kafka_acknowledgment");

        try {
            if (isHealth) {
                String msgData = message.getPayload().toString();
                String msgHead = message.getHeaders().toString();
                System.out.println("消费成功" + message.toString());
                acknowledgmentk.acknowledge();
            }
        } catch (Exception ex) {
            isHealth = false;
            MessageHeaders messageHeaders = message.getHeaders();
            int offset = (int) messageHeaders.get("kafka_offset");
            acknowledgmentk.acknowledge();
        }

        Thread.sleep(5 * 1000);
    }
}